{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Using Tensorflow DALI plugin: DALI and tf.data\n",
    "\n",
    "### Overview\n",
    "\n",
    "DALI offers integration with [tf.data API](https://www.tensorflow.org/guide/data). Using this approach you can easily connect DALI pipeline with various TensorFlow APIs and use it as a data source for your model. This tutorial shows how to do it using well known [MNIST](http://yann.lecun.com/exdb/mnist/) converted to LMDB format. You can find it in [DALI_extra](https://github.com/NVIDIA/DALI_extra) - DALI test data repository.\n",
    "\n",
    "We start with creating a DALI pipeline to read, decode and normalize MNIST images and read corresponding labels.\n",
    "\n",
    "`DALI_EXTRA_PATH` environment variable should point to the place where data from [DALI extra repository](https://github.com/NVIDIA/DALI_extra) is downloaded. Please make sure that the proper release tag is checked out."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import nvidia.dali as dali\n",
    "from nvidia.dali.pipeline import Pipeline\n",
    "import nvidia.dali.ops as ops\n",
    "import nvidia.dali.types as types\n",
    "\n",
    "import os\n",
    "\n",
    "# Path to MNIST dataset\n",
    "data_path = os.path.join(os.environ['DALI_EXTRA_PATH'], 'db/MNIST/training/')\n",
    "\n",
    "\n",
    "class MnistPipeline(Pipeline):\n",
    "    def __init__(self, batch_size, device, device_id=0, num_threads=4, seed=0):\n",
    "        super(MnistPipeline, self).__init__(\n",
    "            batch_size, num_threads, device_id, seed)\n",
    "        self.device = device\n",
    "        self.reader = ops.Caffe2Reader(path=data_path, random_shuffle=True)\n",
    "        self.decode = ops.ImageDecoder(\n",
    "            device='mixed' if device is 'gpu' else 'cpu',\n",
    "            output_type=types.GRAY)\n",
    "        self.cmn = ops.CropMirrorNormalize(\n",
    "            device=device,\n",
    "            output_dtype=types.FLOAT,\n",
    "            image_type=types.GRAY,\n",
    "            std=[255.],\n",
    "            output_layout=\"CHW\")\n",
    "\n",
    "    def define_graph(self):\n",
    "        inputs, labels = self.reader(name=\"Reader\")\n",
    "        images = self.decode(inputs)\n",
    "        if self.device is 'gpu':\n",
    "            labels = labels.gpu()\n",
    "        images = self.cmn(images)\n",
    "\n",
    "        return (images, labels)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we define some parameters of the training:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "BATCH_SIZE = 64\n",
    "DROPOUT = 0.2\n",
    "IMAGE_SIZE = 28\n",
    "NUM_CLASSES = 10\n",
    "HIDDEN_SIZE = 128\n",
    "EPOCHS = 5\n",
    "ITERATIONS_PER_EPOCH = 100"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next step is to wrap an instance of `MnistPipeline` with a `DALIDataset` object from DALI TensorFlow plugin. This class is compatible with `tf.data.Dataset`. Other parameters are shapes and types of the outputs of the pipeline. Here we return images and labels. It means we have two outputs one of type `tf.float32` for images and on of type `tf.int32` for labels."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import nvidia.dali.plugin.tf as dali_tf\n",
    "import tensorflow.compat.v1 as tf\n",
    "tf.logging.set_verbosity(tf.logging.ERROR)\n",
    "tf.disable_eager_execution()\n",
    "\n",
    "\n",
    "# Create pipeline\n",
    "mnist_pipeline = MnistPipeline(BATCH_SIZE, device='cpu', device_id=0)\n",
    "\n",
    "# Define shapes and types of the outputs\n",
    "shapes = [\n",
    "    (BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE),\n",
    "    (BATCH_SIZE)]\n",
    "dtypes = [\n",
    "    tf.float32,\n",
    "    tf.int32]\n",
    "\n",
    "# Create dataset\n",
    "mnist_set = dali_tf.DALIDataset(\n",
    "    pipeline=mnist_pipeline,\n",
    "    batch_size=BATCH_SIZE,\n",
    "    shapes=shapes,\n",
    "    dtypes=dtypes,\n",
    "    device_id=0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We are ready to start the training. Following sections show how to do it with different APIs availible in TensorFlow.\n",
    "\n",
    "### Keras\n",
    "\n",
    "First, we pass `mnist_set` to model created with `tf.keras` and use `model.fit` method to train it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Train on 100 steps\n",
      "Epoch 1/5\n",
      "100/100 [==============================] - 1s 6ms/step - loss: 0.8921 - accuracy: 0.7462\n",
      "Epoch 2/5\n",
      "100/100 [==============================] - 0s 4ms/step - loss: 0.4115 - accuracy: 0.8847\n",
      "Epoch 3/5\n",
      "100/100 [==============================] - 0s 3ms/step - loss: 0.3235 - accuracy: 0.9062\n",
      "Epoch 4/5\n",
      "100/100 [==============================] - 0s 4ms/step - loss: 0.2926 - accuracy: 0.9202\n",
      "Epoch 5/5\n",
      "100/100 [==============================] - 0s 4ms/step - loss: 0.2617 - accuracy: 0.9245\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<tensorflow.python.keras.callbacks.History at 0x7fc3a4955518>"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Create the model\n",
    "model = tf.keras.models.Sequential([\n",
    "    tf.keras.layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE), name='images'),\n",
    "    tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),\n",
    "    tf.keras.layers.Dense(HIDDEN_SIZE, activation='relu'),\n",
    "    tf.keras.layers.Dropout(DROPOUT),\n",
    "    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')])\n",
    "model.compile(\n",
    "    optimizer='adam',\n",
    "    loss='sparse_categorical_crossentropy',\n",
    "    metrics=['accuracy'])\n",
    "\n",
    "# Train using DALI dataset\n",
    "model.fit(\n",
    "    mnist_set,\n",
    "    epochs=EPOCHS,\n",
    "    steps_per_epoch=ITERATIONS_PER_EPOCH)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As you can see, it was very easy to integrate DALI pipeline with `tf.keras` API.\n",
    "\n",
    "The code above performed the training using the CPU. Both the DALI pipeline and the model were using the CPU.\n",
    "\n",
    "We can easily move the whole processing to the GPU. First, we create a pipeline that uses the GPU with ID = 0. Next we place both the DALI dataset and the model on the same GPU."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create pipeline\n",
    "mnist_pipeline = MnistPipeline(BATCH_SIZE, device='gpu', device_id=0)\n",
    "\n",
    "# Define the model and place it on the GPU\n",
    "with tf.device('/gpu:0'):\n",
    "    mnist_set = dali_tf.DALIDataset(\n",
    "        pipeline=mnist_pipeline,\n",
    "        batch_size=BATCH_SIZE,\n",
    "        shapes=shapes,\n",
    "        dtypes=dtypes,\n",
    "        device_id=0)\n",
    "    model = tf.keras.models.Sequential([\n",
    "        tf.keras.layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE), name='images'),\n",
    "        tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),\n",
    "        tf.keras.layers.Dense(HIDDEN_SIZE, activation='relu'),\n",
    "        tf.keras.layers.Dropout(DROPOUT),\n",
    "        tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')])\n",
    "    model.compile(\n",
    "        optimizer='adam',\n",
    "        loss='sparse_categorical_crossentropy',\n",
    "        metrics=['accuracy'])\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We move the training to the GPU as well. This allows TensorFlow to pick up GPU instance of DALI dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Train on 100 steps\n",
      "Epoch 1/5\n",
      "100/100 [==============================] - 1s 7ms/step - loss: 0.9235 - accuracy: 0.7381\n",
      "Epoch 2/5\n",
      "100/100 [==============================] - 1s 6ms/step - loss: 0.4115 - accuracy: 0.8856\n",
      "Epoch 3/5\n",
      "100/100 [==============================] - 1s 6ms/step - loss: 0.3243 - accuracy: 0.9050\n",
      "Epoch 4/5\n",
      "100/100 [==============================] - 0s 5ms/step - loss: 0.2932 - accuracy: 0.9166\n",
      "Epoch 5/5\n",
      "100/100 [==============================] - 1s 7ms/step - loss: 0.2606 - accuracy: 0.9212\n"
     ]
    }
   ],
   "source": [
    "# Train on the GPU\n",
    "with tf.device('/gpu:0'):\n",
    "    model.fit(\n",
    "        mnist_set,\n",
    "        epochs=EPOCHS,\n",
    "        steps_per_epoch=ITERATIONS_PER_EPOCH)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It is important to note here, that there is no intermediate CPU buffer between DALI and TensorFlow in the execution above. DALI GPU outputs are copied straight to TF GPU Tensors used by the model.\n",
    "\n",
    "In this particular toy example performance of the GPU variant is lower than the CPU one. The MNIST images are small and nvJPEG decoder used in the GPU DALI pipeline to decode them is not well suited for such circumstance. We use it here to show how to integrate it properly in the real life case."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "\n",
    "### Estimators\n",
    "\n",
    "Another popular TensorFlow API is `tf.estimator` API. This section shows how to use DALI dataset as a data source for model based on this API. \n",
    "\n",
    "First we create the model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define the feature columns for Estimator\n",
    "feature_columns = [tf.feature_column.numeric_column(\n",
    "    \"images\", shape=[IMAGE_SIZE, IMAGE_SIZE])]\n",
    "\n",
    "# And the run config\n",
    "run_config = tf.estimator.RunConfig(\n",
    "    model_dir='/tmp/tensorflow-checkpoints',\n",
    "    device_fn=lambda op: '/gpu:0')\n",
    "\n",
    "# Finally create the model based on `DNNClassifier`\n",
    "model = tf.estimator.DNNClassifier(\n",
    "    feature_columns=feature_columns,\n",
    "    hidden_units=[HIDDEN_SIZE],\n",
    "    n_classes=NUM_CLASSES,\n",
    "    dropout=DROPOUT,\n",
    "    config=run_config,\n",
    "    optimizer='Adam')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In `tf.estimator` API data is passed to the model with the function returning the dataset. We define this function to return DALI dataset placed on the GPU."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_data_fn():\n",
    "    with tf.device('/gpu:0'):\n",
    "        mnist_pipeline = MnistPipeline(BATCH_SIZE, device='gpu', device_id=0)\n",
    "        mnist_set = dali_tf.DALIDataset(\n",
    "            pipeline=mnist_pipeline,\n",
    "            batch_size=BATCH_SIZE,\n",
    "            shapes=shapes,\n",
    "            dtypes=dtypes,\n",
    "            device_id=0)\n",
    "        mnist_set = mnist_set.map(\n",
    "            lambda features, labels: ({'images': features}, labels))\n",
    "        \n",
    "    return mnist_set"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With everything set up we are ready to run the training."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<tensorflow_estimator.python.estimator.canned.dnn.DNNClassifier at 0x7fc3a6ad8b70>"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Running the training on the GPU\n",
    "model.train(input_fn=train_data_fn, steps=EPOCHS * ITERATIONS_PER_EPOCH)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'accuracy': 0.87921876,\n",
       " 'average_loss': 0.49361104,\n",
       " 'loss': 31.591106,\n",
       " 'global_step': 17500}"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model.evaluate(input_fn=train_data_fn, steps=ITERATIONS_PER_EPOCH)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Custom models and training loops\n",
    "\n",
    "Finally, the last part of this tutorial focuses on integrating DALI dataset with custom models and training loops. A complete example below shows from start to finish how to use DALI dataset with native TensorFlow model and run training using `tf.Session`.\n",
    "\n",
    "First step is to define the model and the dataset and place both on the GPU."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.reset_default_graph()\n",
    "\n",
    "options = tf.data.Options()\n",
    "options.experimental_optimization.apply_default_optimizations = False\n",
    "options.experimental_optimization.autotune = False\n",
    "\n",
    "\n",
    "with tf.device('/gpu:0'):\n",
    "    mnist_set = dali_tf.DALIDataset(\n",
    "        pipeline=MnistPipeline(BATCH_SIZE, device='gpu', device_id=0),\n",
    "        batch_size=BATCH_SIZE,\n",
    "        shapes=shapes,\n",
    "        dtypes=dtypes,\n",
    "        device_id=0).with_options(options)\n",
    "\n",
    "    iterator = tf.data.make_initializable_iterator(mnist_set)\n",
    "    images, labels = iterator.get_next()\n",
    "\n",
    "    labels = tf.reshape(\n",
    "        tf.one_hot(labels, NUM_CLASSES),\n",
    "        [BATCH_SIZE, NUM_CLASSES])\n",
    "    \n",
    "    with tf.variable_scope('mnist_net', reuse=False):\n",
    "        images = tf.layers.flatten(images)\n",
    "        images = tf.layers.dense(images, HIDDEN_SIZE, activation=tf.nn.relu)\n",
    "        images = tf.layers.dropout(images, rate=DROPOUT, training=True)\n",
    "        images = tf.layers.dense(images, NUM_CLASSES, activation=tf.nn.softmax)\n",
    "\n",
    "    logits_train = images\n",
    "    loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(\n",
    "        logits=logits_train, labels=labels))\n",
    "    train_step = tf.train.AdamOptimizer().minimize(loss_op)\n",
    "\n",
    "    correct_pred = tf.equal(\n",
    "            tf.argmax(logits_train, 1), tf.argmax(labels, 1))\n",
    "    accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With `tf.Session` we can run this model and train it on the GPU."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Step 0, accuracy: 0.109375\n",
      "Step 100, accuracy: 0.828125\n",
      "Step 200, accuracy: 0.828125\n",
      "Step 300, accuracy: 0.96875\n",
      "Step 400, accuracy: 0.890625\n",
      "Final accuracy:  0.90734375\n"
     ]
    }
   ],
   "source": [
    "with tf.Session() as sess:\n",
    "        sess.run(tf.global_variables_initializer())\n",
    "        sess.run(iterator.initializer)\n",
    "\n",
    "        for i in range(EPOCHS * ITERATIONS_PER_EPOCH):\n",
    "            sess.run(train_step)\n",
    "            if i % ITERATIONS_PER_EPOCH == 0:\n",
    "                train_accuracy = sess.run(accuracy)\n",
    "                print(\"Step %d, accuracy: %g\" % (i, train_accuracy))\n",
    "\n",
    "        final_accuracy = 0\n",
    "        for _ in range(ITERATIONS_PER_EPOCH):\n",
    "            final_accuracy = final_accuracy + sess.run(accuracy)\n",
    "        final_accuracy = final_accuracy / ITERATIONS_PER_EPOCH\n",
    "\n",
    "        print('Final accuracy: ', final_accuracy)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
